< Back
## Kafka (in docker container) + Spring. This post is a refined version of the Sping document's getting start guid. It make sure it can run on a localhost successfully. >Spring doc getting start https://docs.spring.io/spring-kafka/docs/current/reference/html/#getting-started ## Architecture ```plantuml @startuml rectangle host { rectangle vm { rectangle container as zkc{ rectangle zookeeper as zk } rectangle container as kfc{ rectangle kafka as kf } zk <.left.> kf } rectangle consumer as cs rectangle producer as pd pd --> kfc : push message kfc --> cs : consume message } @enduml ``` >run kafka in docker: https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ From the architecture above. I have a vm running inside my host. To run `kafka` and `zookeeper` container use the following `docker-compose.yml` ```yaml version: "2" services: zookeeper: image: docker.io/bitnami/zookeeper:3.7 ports: - "2181:2181" volumes: - "zookeeper_data:/bitnami" environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: docker.io/bitnami/kafka:3 ports: - "9092:9092" - "9093:9093" volumes: - "kafka_data:/bitnami" environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://kafka-docker-machine-dev:9093 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper volumes: zookeeper_data: driver: local kafka_data: driver: local ``` For the `KAFKA_CFG_LISTENERS` , we have 2 listeners the `CLIENT` and `EXTERNAL`. Kafka's broker can have one or more listeners attatched to a broker. Now let's start from scratch. Without `KAFKA_CFG_LISTENERS` configured as above, we would have only 1 listener to a borker that run inside a docker container. Therefore, when we connect to vm's ip with port `9092` , Kafka broker will return the target broker metadata where producer/consumer can write/read from. In docker case, it will return kafka's container id instead. ```text $ docker ps CONTAINER ID IMAGE ..... ab0fa12a3422 bitnami/kafka:3 ..... ``` like `ab0fa12a3422` . Hence, Spring will have no way to know if where `ab0fa12a3422:9092` is. Unless I mapped `ab0fa12a3422` to vm's ip in my host's hosts file then the host can resolve that `ab0fa12a3422` to vm's ip correctly and spring can started to connnect to the borker. Unfortunately, docker's container id is keep changing. We have to deal with this problem. `KAFKA_CFG_LISTENERS` is the answer. When Spring initiate a connection to a broker, in the initial state, broker would return docker's id, but if we configure kafka as docker-compose file above. We can now have a choice which listener we want to talk to; the `External` one. No matter what contianer id will be, the `EXTERNAL` lsitener will always return `kafka-docker-machine-dev` as per configured. Therefore, I just have to map `kafka-docker-machine-dev` in my host's hosts file. --- ## Spring boot app. add `Spring for Apache Kafka` when bootstrap from spring initializer. ### Consumer The `KafkaConsumerApplication.java` ```java package com.kone.sandbox.kafkaconsumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.TopicBuilder; @Slf4j @SpringBootApplication public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } @Bean public NewTopic topic() { return TopicBuilder.name("topic1").partitions(10).replicas(1).build(); } @KafkaListener(id = "myId", topics = "topic1") public void listen(String in) { log.info(in); } } ``` `application.yml` ```yaml spring: kafka: consumer: auto-offset-reset: earliest properties: bootstrap: servers: "kafka-docker-machine-dev:9093" admin: properties: bootstrap: servers: "kafka-docker-machine-dev:9093" ``` ### Producer `KafkaProducerApplication.java` ```java package com.kone.sandbox.kafkaproducer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.KafkaTemplate; import java.time.LocalTime; @Slf4j @SpringBootApplication public class KafkaProducerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerApplication.class, args); } @Bean public NewTopic topic() { return TopicBuilder.name("topic1").partitions(10).replicas(1).build(); } @Bean public ApplicationRunner runner(KafkaTemplate<String, String> template) { return args -> { template.send("topic1", "test:" + LocalTime.now()); }; } } ``` `applicaiton.yml` ```java spring: kafka: producer: auto-offset-reset: earliest properties: bootstrap: servers: "kafka-docker-machine-dev:9093" admin: properties: bootstrap: servers: "kafka-docker-machine-dev:9093" ``` **Result** ![output](https://i.imgur.com/2QEy1Vk.png)